-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix sort order aware file group parallelization #8517
Conversation
f2ec70c
to
d7026eb
Compare
// ordering is lost here | ||
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", | ||
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", | ||
"ParquetExec: file_groups={10 groups: [[x:0..20], [y:0..20], [x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], [x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The file is now divided into groups that preserve its order and thus no resort is needed
@@ -118,7 +118,7 @@ physical_plan | |||
SortPreservingMergeExec: [column1@0 ASC NULLS LAST] | |||
--CoalesceBatchesExec: target_batch_size=8192 | |||
----FilterExec: column1@0 != 42 | |||
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 | |||
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..197], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..201], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:201..403], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:197..394]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shows the test added in #8505 is now fixed (the two files are not intermixed)
use std::collections::BinaryHeap; | ||
use std::iter::repeat_with; | ||
|
||
/// Repartition input files into `target_partitions` partitions, if total file size exceed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a bunch of comments trying to clarify what this code was supposed to be doing
/// divides into 4 groups | ||
/// ``` | ||
#[derive(Debug, Clone, Copy)] | ||
pub struct FileGroupPartitioner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API is new, but the file distribution algorithm is the same for unordered inputs
&self, | ||
file_groups: &[Vec<PartitionedFile>], | ||
) -> Option<Vec<Vec<PartitionedFile>>> { | ||
let target_partitions = self.target_partitions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the old algorithm, unmodified
mod test { | ||
use super::*; | ||
|
||
/// Empty file won't get partitioned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first set of tests are the original tests, though I refactored them so they didn't rely on ParquetExec
} | ||
|
||
#[test] | ||
fn repartition_ordered_no_action_too_few_partitions() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New tests start here
pub fn repartition_file_groups( | ||
file_groups: Vec<Vec<PartitionedFile>>, | ||
target_partitions: usize, | ||
repartition_file_min_size: usize, | ||
) -> Option<Vec<Vec<PartitionedFile>>> { | ||
let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored into datafusion/core/src/datasource/physical_plan/file_groups.rs
@@ -809,345 +810,4 @@ mod tests { | |||
extensions: None, | |||
} | |||
} | |||
|
|||
/// Unit tests for `repartition_file_groups()` | |||
#[cfg(feature = "parquet")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests were moved and refactored into datafusion/core/src/datasource/physical_plan/file_groups.rs
@@ -3862,6 +3863,56 @@ pub(crate) mod tests { | |||
Ok(()) | |||
} | |||
|
|||
#[test] | |||
fn parallelization_multiple_files() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test fails on main
d7026eb
to
0b558bc
Compare
Thank you for the review @Dandandan |
Draft as it builds on #8505Which issue does this PR close?
Closes #8451
Rationale for this change
Repatitioning data for pre-sorted listing tables can sometimes result in incorrect results. See descriptions on #8451 and in the comments in this PR for details
What changes are included in this PR?
Are these changes tested?
Yes, new unit tests and end to end coverage (updates to #8505)
Are there any user-facing changes?
Correct answers with pre-sorted data